package com.king.first.app.windows;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class Sliding_Windows {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop01", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = socketTextStream.flatMap((s, collector) -> {
            String[] s1 = s.split(" ");
            Arrays.stream(s1).forEach(word -> collector.collect(Tuple2.of(word, 1)));
        });

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatMapStream.keyBy(tuple2 -> tuple2.f0);

        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(6), Time.seconds(2)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = windowedStream.sum(1);
        sum.print();

        env.execute();
    }
}
